{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "UserWarning: Thank you for using the Modin Experimental pandas API.\n",
      "Please note that some of these APIs deviate from pandas in order to provide improved performance.\n"
     ]
    }
   ],
   "source": [
    "# This notebook expects that Modin and Ray are installed, e.g. by `pip install modin[ray]`.\n",
    "# For all ways to install Modin see official documentation at:\n",
    "# https://modin.readthedocs.io/en/latest/installation.html\n",
    "\n",
    "# NOTE: this is special version for showing cloud-cluster functionality.\n",
    "# It requires installation of extra packages: `pip install cloudpickle rpyc`\n",
    "# Also if your environment requires proxy for SSH you need to expose it via MODIN_SOCKS_PROXY environment variable,\n",
    "# please note that it requires ray >= 0.8.7 to work\n",
    "import modin.experimental.pandas as pd\n",
    "from modin.experimental.cloud import create_cluster"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "columns_names = [\n",
    "        \"trip_id\", \"vendor_id\", \"pickup_datetime\", \"dropoff_datetime\", \"store_and_fwd_flag\",\n",
    "        \"rate_code_id\", \"pickup_longitude\", \"pickup_latitude\", \"dropoff_longitude\", \"dropoff_latitude\",\n",
    "        \"passenger_count\", \"trip_distance\", \"fare_amount\", \"extra\", \"mta_tax\", \"tip_amount\",\n",
    "        \"tolls_amount\", \"ehail_fee\", \"improvement_surcharge\", \"total_amount\", \"payment_type\",\n",
    "        \"trip_type\", \"pickup\", \"dropoff\", \"cab_type\", \"precipitation\", \"snow_depth\", \"snowfall\",\n",
    "        \"max_temperature\", \"min_temperature\", \"average_wind_speed\", \"pickup_nyct2010_gid\",\n",
    "        \"pickup_ctlabel\", \"pickup_borocode\", \"pickup_boroname\", \"pickup_ct2010\",\n",
    "        \"pickup_boroct2010\", \"pickup_cdeligibil\", \"pickup_ntacode\", \"pickup_ntaname\", \"pickup_puma\",\n",
    "        \"dropoff_nyct2010_gid\", \"dropoff_ctlabel\", \"dropoff_borocode\", \"dropoff_boroname\",\n",
    "        \"dropoff_ct2010\", \"dropoff_boroct2010\", \"dropoff_cdeligibil\", \"dropoff_ntacode\",\n",
    "        \"dropoff_ntaname\", \"dropoff_puma\",\n",
    "    ]\n",
    "parse_dates=[\"pickup_datetime\", \"dropoff_datetime\"]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "To monitor auto-scaling activity, you can run:\n",
      "\n",
      "  ray exec /home/vnlitvinov/.modin/cloud/config-9aba3e92.yml 'tail -n 100 -f /tmp/ray/session_*/logs/monitor*'\n",
      "\n",
      "To open a console on the cluster:\n",
      "\n",
      "  ray attach /home/vnlitvinov/.modin/cloud/config-9aba3e92.yml\n",
      "\n",
      "To get a remote shell to the cluster manually, run:\n",
      "\n",
      "  ssh -o IdentitiesOnly=yes -i /home/vnlitvinov/.ssh/ray-autoscaler_2_eu-north-1.pem ubuntu@13.48.203.18\n",
      "\n",
      "\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "UserWarning: Was not able to intercept all numpy imports. To intercept all of these please do 'import modin.experimental.pandas' as early as possible\n"
     ]
    }
   ],
   "source": [
    "with create_cluster('aws', '../../../aws_credentials',\n",
    "                    cluster_name=\"rayscale-test\",\n",
    "                    region=\"eu-north-1\", zone=\"eu-north-1b\", image=\"ami-00e1e82d7d4ca80d3\") as remote:\n",
    "    df = pd.read_csv('https://modin-datasets.s3.amazonaws.com/trips_data.csv', names=columns_names,\n",
    "                    header=None, parse_dates=parse_dates)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "      trip_id  vendor_id  ...              dropoff_ntaname dropoff_puma\n",
      "0           1          2  ...                          NaN          NaN\n",
      "1           2          2  ...                          NaN          NaN\n",
      "2           3          2  ...                          NaN          NaN\n",
      "3           4          2  ...                          NaN          NaN\n",
      "4           5          2  ...                          NaN          NaN\n",
      "...       ...        ...  ...                          ...          ...\n",
      "9995     9881          2  ...             Hamilton Heights       3802.0\n",
      "9996     9882          2  ...     Washington Heights North       3801.0\n",
      "9997     9883          2  ...            East Harlem South       3804.0\n",
      "9998     9884          2  ...     Washington Heights South       3801.0\n",
      "9999     9885          2  ...  Lenox Hill-Roosevelt Island       3805.0\n",
      "\n",
      "[10000 rows x 51 columns]\n"
     ]
    }
   ],
   "source": [
    "with remote:\n",
    "    print(df)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "def q1(df):\n",
    "    return df.groupby(\"cab_type\")[\"cab_type\"].count()\n",
    "def q2(df):\n",
    "    return df.groupby(\"passenger_count\", as_index=False).mean()[[\"passenger_count\", \"total_amount\"]]\n",
    "def q3(df):\n",
    "    return df.groupby([\"passenger_count\", \"pickup_datetime\"]).size().reset_index()\n",
    "def q4(df):\n",
    "    transformed = pd.DataFrame({\n",
    "        \"passenger_count\": df[\"passenger_count\"],\n",
    "        \"pickup_datetime\": df[\"pickup_datetime\"].dt.year,\n",
    "        \"trip_distance\": df[\"trip_distance\"].astype(\"int64\"),\n",
    "    })\n",
    "    return transformed.groupby([\"passenger_count\", \"pickup_datetime\", \"trip_distance\"])  \\\n",
    "            .size().reset_index().sort_values(by=[\"pickup_datetime\", 0], ascending=[True, False])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "10000\n",
      "   passenger_count  total_amount\n",
      "0                0     18.333333\n",
      "1                1     15.258850\n",
      "2                2     20.332356\n",
      "3                3     13.748845\n",
      "4                4     19.742688\n",
      "5                5     14.786221\n",
      "6                6     15.400085\n",
      "      passenger_count     pickup_datetime  0\n",
      "0                   0 2013-08-14 12:07:00  1\n",
      "1                   0 2013-08-14 12:37:00  1\n",
      "2                   0 2013-08-15 00:00:00  1\n",
      "3                   1 2013-08-01 08:14:37  1\n",
      "4                   1 2013-08-01 09:48:00  1\n",
      "...               ...                 ... ..\n",
      "9909                6 2013-09-28 18:30:15  1\n",
      "9910                6 2013-09-28 19:57:22  1\n",
      "9911                6 2013-09-29 18:47:29  1\n",
      "9912                6 2013-09-30 02:27:33  1\n",
      "9913                6 2013-09-30 21:31:06  1\n",
      "\n",
      "[9914 rows x 3 columns]\n",
      "     passenger_count  pickup_datetime  trip_distance     0\n",
      "2                  1             2013              0  1991\n",
      "3                  1             2013              1  1270\n",
      "4                  1             2013              2   853\n",
      "80                 5             2013              0   551\n",
      "81                 5             2013              1   537\n",
      "..               ...              ...            ...   ...\n",
      "77                 4             2013             10     1\n",
      "78                 4             2013             11     1\n",
      "79                 4             2013             14     1\n",
      "102                5             2013             28     1\n",
      "115                6             2013             14     1\n",
      "\n",
      "[116 rows x 4 columns]\n"
     ]
    }
   ],
   "source": [
    "with remote:\n",
    "    for query in (q1, q2, q3, q4):\n",
    "        print(query(df))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "license": "http://www.apache.org/licenses/LICENSE-2.0",
  "license_notice": [
   "Licensed to Modin Development Team under one or more contributor license agreements.",
   "See the NOTICE file distributed with this work for additional information regarding",
   "copyright ownership.  The Modin Development Team licenses this file to you under the",
   "Apache License, Version 2.0 (the \"License\"); you may not use this file except in",
   "compliance with the License.  You may obtain a copy of the License at",
   "",
   "    http://www.apache.org/licenses/LICENSE-2.0",
   "",
   "Unless required by applicable law or agreed to in writing, software distributed under",
   "the License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF",
   "ANY KIND, either express or implied. See the License for the specific language",
   "governing permissions and limitations under the License."
  ],
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.8"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
